# Bring Your Own Model

## Introduction

MindsDB allows you to integrate your own machine learning models into it.

In order to do this your model will require some sort of API wrapper, for now we have two API specifications we support: [MLflow](https://www.mlflow.org) and [Ray Serve](https://docs.ray.io/en/latest/serve/index.html).

The former supports importing already trained models and predicting with them from mindsdb. The later supports both training and predicting with external models.

In order to use custom models there are three mandatory arguments one must past inside the `USING` statement:
- `url.predict`, this is the url to call for getting predictions from your model
- `format`, this can be either `mlflow` or `ray_serve`
- `dtype_dict`, this is a JSON specifying all columns expected by their models, and their respective data types. For now, the mapping supports data types used by [lightwood](https://mindsdb.github.io/lightwood/api/dtype.html), our AutoML library.

There's an additional optional argument if you want to train the model via MindsDB (only for Ray Serve):
- `url.train`, which is the endpoint that will be called to train your model

* * *

## 1. Ray Serve

### 1.1 Simple example - Logistic regression

Ray serve is a simple high-throughput service that can wrap over your own ml models. In this example, we will train and predict with an external scikit-learn model. First, let's look at the actual model wrapped inside a class that complies with the above requirements:

```python
import ray
from fastapi import Request, FastAPI
from ray import serve
import time
import pandas as pd
import json
from sklearn.linear_model import LogisticRegression


app = FastAPI()
ray.init()
serve.start(detached=True)


async def parse_req(request: Request):
    data = await request.json()
    target = data.get('target', None)
    di = json.loads(data['df'])
    df = pd.DataFrame(di)
    return df, target


@serve.deployment(route_prefix="/my_model")
@serve.ingress(app)
class MyModel:
    @app.post("/train")
    async def train(self, request: Request):
        df, target = await parse_req(request)
        feature_cols = list(set(list(df.columns)) - set([target]))
        self.feature_cols = feature_cols
        X = df.loc[:, self.feature_cols]
        Y = list(df[target])
        self.model = LogisticRegression()
        self.model.fit(X, Y)
        return {'status': 'ok'}

    @app.post("/predict")
    async def predict(self, request: Request):
        df, _ = await parse_req(request)
        X = df.loc[:, self.feature_cols]
        predictions = self.model.predict(X)
        pred_dict = {'prediction': [float(x) for x in predictions]}
        return pred_dict


MyModel.deploy()

while True:
    time.sleep(1)
```

The important bits here are having train and predict endpoints.

The `train` endpoint accept two parameters in the JSON sent via POST:
- `df` -- a serialized dictionary that can be converted into a pandas dataframe
- `target` -- the name of the target column

The `predict` endpoint needs only one parameter:
- `df` -- a serialized dictionary that can be converted into a pandas dataframe


The training endpoints must return a JSON that contains the keys `status` set to `ok`. The predict endpoint must return a dictionary containing the `prediction` key, storing the predictions. Additional keys can be returned for confidence and confidence intervals.

Once you start this RayServe-wrapped model you can train it using a query like this one:

```sql
CREATE PREDICTOR mindsdb.byom_ray_serve
FROM mydb (
    SELECT number_of_rooms, initial_price, rental_price 
    FROM test_data.home_rentals
) 
PREDICT number_of_rooms
USING
url.train = 'http://127.0.0.1:8000/my_model/train',
url.predict = 'http://127.0.0.1:8000/my_model/predict',
dtype_dict={"number_of_rooms": "categorical", "initial_price": "integer", "rental_price": "integer"},
format='ray_server';
```

And you can query predictions as usual, either by conditioning on a subset of input colums:

```sql
SELECT * FROM byom_ray_serve WHERE initial_price=3000 AND rental_price=3000;
```

Or by `JOINING` to do batch predictions:

```sql
SELECT tb.number_of_rooms, t.rental_price FROM mydb.test_data.home_rentals AS t JOIN mindsdb.byom_ray_serve AS tb WHERE t.rental_price > 5300;
```

*Please note that, if your model is behind a reverse proxy (e.g. nginx) you might have to increase the maximum limit for POST requests in order to receive the training data. MindsDB itself can send as much as you'd like and has been stress-tested with over a billion rows.*

### 1.2. Example - Keras NLP model

For this example, we will consider a natural language processing (NLP) [task](https://www.kaggle.com/c/nlp-getting-started) where we want to train a neural network with [Keras](https://keras.io) to detect if a tweet is related to a natural disaster (fires, earthquakes, etc.). Please download this dataset to follow the example.

The code for the model here is a bit more complex than in section 1.1, but the same rules apply: we create a Ray Server based service that wraps around a [Kaggle NLP Model](https://www.kaggle.com/shahules/basic-eda-cleaning-and-glove) which can be trained and then used for predictions:

```python
import re
import time
import json
import string
import requests
from collections import Counter, defaultdict
​
import ray
from ray import serve
​
import gensim
import numpy as np
import pandas as pd
from tqdm import tqdm
from nltk.util import ngrams
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from fastapi import Request, FastAPI
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
​
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.initializers import Constant
from tensorflow.keras.optimizers import Adam
​
app = FastAPI()
stop = set(stopwords.words('english'))
​
​
async def parse_req(request: Request):
    data = await request.json()
    target = data.get('target', None)
    di = json.loads(data['df'])
    df = pd.DataFrame(di)
    return df, target
​
​
@serve.deployment(route_prefix="/nlp_kaggle_model")
@serve.ingress(app)
class Model:
    MAX_LEN = 100
    GLOVE_DIM = 50
    EPOCHS = 10
​
    def __init__(self):
        self.model = None
​
    @app.post("/train")
    async def train(self, request: Request):
        df, target = await parse_req(request)
​
        target_arr = df.pop(target).values
        df = self.preprocess_df(df)
        train_corpus = self.create_corpus(df)
​
        self.embedding_dict = {}
        with open('./glove.6B.50d.txt', 'r') as f:
            for line in f:
                values = line.split()
                word = values[0]
                vectors = np.asarray(values[1:], 'float32')
                self.embedding_dict[word] = vectors
        f.close()
​
        self.tokenizer_obj = Tokenizer()
        self.tokenizer_obj.fit_on_texts(train_corpus)
​
        sequences = self.tokenizer_obj.texts_to_sequences(train_corpus)
        tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
        df = tweet_pad[:df.shape[0]]
​
        word_index = self.tokenizer_obj.word_index
        num_words = len(word_index) + 1
        embedding_matrix = np.zeros((num_words, self.__class__.GLOVE_DIM))
​
        for word, i in tqdm(word_index.items()):
            if i > num_words:
                continue
​
            emb_vec = self.embedding_dict.get(word)
            if emb_vec is not None:
                embedding_matrix[i] = emb_vec
​
        self.model = Sequential()
        embedding = Embedding(num_words,
                              self.__class__.GLOVE_DIM,
                              embeddings_initializer=Constant(embedding_matrix),
                              input_length=self.__class__.MAX_LEN,
                              trainable=False)
        self.model.add(embedding)
        self.model.add(SpatialDropout1D(0.2))
        self.model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
        self.model.add(Dense(1, activation='sigmoid'))
​
        optimzer = Adam(learning_rate=1e-5)
        self.model.compile(loss='binary_crossentropy', optimizer=optimzer, metrics=['accuracy'])
​
        X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)
        self.model.fit(X_train, y_train, batch_size=4, epochs=self.__class__.EPOCHS, validation_data=(X_test, y_test), verbose=2)
​
        return {'status': 'ok'}
​
    @app.post("/predict")
    async def predict(self, request: Request):
        df, _ = await parse_req(request)
​
        df = self.preprocess_df(df)
        test_corpus = self.create_corpus(df)
​
        sequences = self.tokenizer_obj.texts_to_sequences(test_corpus)
        tweet_pad = pad_sequences(sequences, maxlen=self.__class__.MAX_LEN, truncating='post', padding='post')
        df = tweet_pad[:df.shape[0]]
​
        y_pre = self.model.predict(df)
        y_pre = np.round(y_pre).astype(int).flatten().tolist()
        sub = pd.DataFrame({'target': y_pre})
​
        pred_dict = {'prediction': [float(x) for x in sub['target'].values]}
        return pred_dict
​
    def preprocess_df(self, df):
        df = df[['text']]
        df['text'] = df['text'].apply(lambda x: self.remove_URL(x))
        df['text'] = df['text'].apply(lambda x: self.remove_html(x))
        df['text'] = df['text'].apply(lambda x: self.remove_emoji(x))
        df['text'] = df['text'].apply(lambda x: self.remove_punct(x))
        return df
​
    def remove_URL(self, text):
        url = re.compile(r'https?://\S+|www\.\S+')
        return url.sub(r'', text)
​
    def remove_html(self, text):
        html = re.compile(r'<.*?>')
        return html.sub(r'', text)
​
    def remove_punct(self, text):
        table = str.maketrans('', '', string.punctuation)
        return text.translate(table)
​
    def remove_emoji(self, text):
        emoji_pattern = re.compile("["
                                   u"\U0001F600-\U0001F64F"  # emoticons
                                   u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                   u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                   u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                   u"\U00002702-\U000027B0"
                                   u"\U000024C2-\U0001F251"
                                   "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)
​
    def create_corpus(self, df):
        corpus = []
        for tweet in tqdm(df['text']):
            words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
            corpus.append(words)
        return corpus
​
​
if __name__ == '__main__':
​
    ray.init()
    serve.start(detached=True)
​
    Model.deploy()
​
    while True:
        time.sleep(1)
```

We need access to the training data, so we'll create a table called `nlp_kaggle_train` to load the [dataset](https://www.kaggle.com/c/nlp-getting-started) that the original model uses.

And ingest it into a table with the following schema:

```sql
id INT,
keyword VARCHAR(255),
location VARCHAR(255),
text VARCHAR(5000),
target INT
```

*Note: specifics of the schema and how to ingest the csv will vary depending on your database.*

Next, we can register and train the above custom model using the following query:

```sql
CREATE PREDICTOR mindsdb.byom_ray_serve_nlp
FROM maria (
    SELECT text, target
    FROM test.nlp_kaggle_train
) PREDICT target
USING
url.train = 'http://127.0.0.1:8000/nlp_kaggle_model/train',
url.predict = 'http://127.0.0.1:8000/nlp_kaggle_model/predict',
dtype_dict={"text": "rich_text", "target": "integer"},
format='ray_server';
```

Training will take a while given that this model is a neural network rather than a simple logistic regression. You can check its status with the query `SELECT * FROM mindsdb.predictors WHERE name = 'byom_ray_serve_nlp';`, much like you'd do with a "normal" MindsDB predictor.

Once the predictor's status becomes `trained` we can query it for predictions as usual:

```sql
SELECT * FROM mindsdb.byom_ray_serve_nlp WHERE text='The tsunami is coming, seek high ground';
```

Which would, hopefully, output `1`. Alternatively, we can try out this tweet to expect `0` as an output:

```sql
SELECT * FROM mindsdb.byom_ray_serve_nlp WHERE text='This is lovely dear friend';
```

If your results do not match this example, it could help to train the model for a longer amount of epochs.

* * *

## 2. MLFlow

### 2.1 Simple example - Logistic Regression

MLFlow is a tool that you can use to train and serve models, among other features like organizing experiments, tracking metrics, etc.

Given there is no way to train an MLflow-wrapped model using its API, you will have to train your models outside of MindsDB by pulling your data manually (i.e. with a script), ideally using a MLflow run or experiment.

The first step would be to create a script where you train a model and save it using one of the saving methods that MLflow exposes. For this example, we will use the model in this [simple tutorial](https://github.com/mlflow/mlflow#saving-and-serving-models) where the method is `mlflow.sklearn.log_model` ([here](https://github.com/mlflow/mlflow/blob/9781af9c0898827bf616a8f159168477a69036dd/examples/sklearn_logistic_regression/train.py#L15)), given that the model is built with scikit-learn.

Once trained, you need to make sure the model is served and listening for input in a URL of your choice (note, this can mean your model can run on a different machine than the one executing MindsDB). Let's assume this URL to be `http://localhost:5000/invocations` for now.

This means you would execute the following command in your terminal, from the directory where the model was stored:

`mlflow models serve --model-uri runs:/<run-id>/model`

With `<run-id>` given in the output of the command `python train.py` used for actually training the model.

Next, we're going to bring this model into MindsDB:

```sql
CREATE PREDICTOR mindsdb.byom_mlflow 
PREDICT `1`  -- `1` is the target column name
USING 
url.predict='http://localhost:5000/invocations', 
format='mlflow', 
data_dtype={"0": "integer", "1": "integer"}
```

We can now run predictions as usual, by using the `WHERE` statement or joining on a data table with an appropriate schema:

```sql
SELECT `1` FROM byom_mlflow WHERE `0`=2;
```


## 2.2. Advanced example - Keras NLP model

Same use case as in section 1.2, be sure to download the dataset to reproduce the steps here. In this case, we will take a look at the best practices when your model needs custom data preprocessing code (which, realistically, will be fairly common).

The key difference is that we now need to use the `mlflow.pyfunc` module to both 1) save the model using `mlflow.pyfunc.save_model` and 2) subclass `mlflow.pyfunc.PythonModel` to wrap the model in an MLflow-compatible way that will enable our custom inference logic to be called.

### Saving the model

In the same script where you train the model (which you can find in the final section of 2.2) there should be a call at the end where you actually use mlflow to save every produced artifact:

```python
mlflow.pyfunc.save_model(
    path="nlp_kaggle",
    python_model=Model(),
    conda_env=conda_env,
    artifacts=artifacts
)
```

Here, `artifacts` will be a dictionary with all expected produced outputs when running the training phase. In this case, we want both a model and a tokenizer to preprocess the input text. On the other hand, `conda_env` specifies the environment under which your model should be executed once served in a self-contained conda environment, so it should include all required packages and dependencies. For this example, they look like this:

```python
# these will be accessible inside the Model() wrapper
artifacts = {
    'model': model_path,
    'tokenizer_path': tokenizer_path,
}

# specs for environment that will be created when serving the model
conda_env = {
    'name': 'nlp_keras_env',
    'channels': ['defaults'],
    'dependencies': [
        'python=3.8',
        'pip',
        {
            'pip': [
                'mlflow',
                'tensorflow',
                'cloudpickle',
                'nltk',
                'pandas',
                'numpy',
                'scikit-learn',
                'tqdm',
            ],
        },
    ],
}
```

Finally, to actually store the model you need to provide the wrapper class that will 1) load all produced artifacts into an accessible "context" and 2) implement all required inference logic:

```python
class Model(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        # we use paths in the context to load everything
        self.model_path = context.artifacts['model']
        self.model = load_model(self.model_path)
        with open(context.artifacts['tokenizer_path'], 'rb') as f:
            self.tokenizer = pickle.load(f)

    def predict(self, context, model_input):
        # preprocess input, tokenize, pad, and call the model
        df = preprocess_df(model_input)
        corpus = create_corpus(df)
        sequences = self.tokenizer.texts_to_sequences(corpus)
        tweet_pad = pad_sequences(sequences, 
                                  maxlen=MAX_LEN, 
                                  truncating='post', 
                                  padding='post')
        df = tweet_pad[:df.shape[0]]

        y_pre = self.model.predict(df)
        y_pre = np.round(y_pre).astype(int).flatten().tolist()

        return list(y_pre)
```

As you can see, here we are loading multiple artifacts and using them to guarantee the input data will be in the same format that was used when training. Ideally, you would abstract this even further into a single `preprocess` method that is called both at training time and inference time.

Finally, serving is simple. Go to the directory where you called the above script, and execute `mlflow models serve --model-uri ./nlp_kaggle`.

At this point, the rest is essentially the same as in the previous example. You can link the MLflow model with these SQL statements:

```sql
CREATE PREDICTOR mindsdb.byom_mlflow_nlp
PREDICT `target`
USING 
    url.predict='http://localhost:5000/invocations',
    format='mlflow',
    dtype_dict={"text": "rich text", "target": "binary"};
```

To get predictions, you can directly pass input data using the `WHERE` clause:

```sql
SELECT target
FROM mindsdb.byom_mlflow_nlp
WHERE text='The tsunami is coming, seek high ground';
```

Or you can `JOIN` with a data table. For this, you should ensure the table actually exists and that the database it belongs to has been connected to your MindsDB instance. For more details, refer to the same steps in the Ray Serve example (section 1.2).

```sql
SELECT
    ta.text,
    tb.target as predicted
FROM db_byom.test.nlp_kaggle_test as ta
JOIN mindsdb.byom_mlflow_nlp as tb;
```

### Full Script

Finally, for reference, here's the full script that trains and saves the model. The model is exactly the same as in section 1.2, so it may seem familiar.

```python
import re
import pickle
import string

import mlflow.pyfunc

import nltk
import tqdm
import sklearn
import tensorflow
import cloudpickle
import numpy as np
import pandas as pd
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from sklearn.model_selection import train_test_split
from tensorflow.keras.initializers import Constant
from tensorflow.keras.layers import Embedding, LSTM, Dense, SpatialDropout1D
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.models import load_model


stop = set(stopwords.words('english'))

MAX_LEN = 100
GLOVE_DIM = 50
EPOCHS = 10


def preprocess_df(df):
    df = df[['text']]
    funcs = [remove_URL, remove_html, remove_emoji, remove_punct]
    for fn in funcs:
        df['text'] = df['text'].apply(lambda x: fn(x))
    return df


def remove_URL(text):
    url = re.compile(r'https?://\S+|www\.\S+')
    return url.sub(r'', text)


def remove_html(text):
    html = re.compile(r'<.*?>')
    return html.sub(r'', text)


def remove_punct(text):
    table = str.maketrans('', '', string.punctuation)
    return text.translate(table)


def remove_emoji(text):
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                               u"\U00002702-\U000027B0"
                               u"\U000024C2-\U0001F251"
                               "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)


def create_corpus(df):
    corpus = []
    for tweet in tqdm.tqdm(df['text']):
        words = [word.lower() for word in word_tokenize(tweet) if ((word.isalpha() == 1) & (word not in stop))]
        corpus.append(words)
    return corpus


class Model(mlflow.pyfunc.PythonModel):

    def load_context(self, context):

        self.model_path = context.artifacts['model']
        with open(context.artifacts['tokenizer_path'], 'rb') as f:
            self.tokenizer = pickle.load(f)
        self.model = load_model(self.model_path)

    def predict(self, context, model_input):

        df = preprocess_df(model_input)
        corpus = create_corpus(df)
        sequences = self.tokenizer.texts_to_sequences(corpus)
        tweet_pad = pad_sequences(sequences, maxlen=MAX_LEN, truncating='post', padding='post')
        df = tweet_pad[:df.shape[0]]

        y_pre = self.model.predict(df)
        y_pre = np.round(y_pre).astype(int).flatten().tolist()

        return list(y_pre)


if __name__ == '__main__':
    train_model = True

    model_path = './'
    tokenizer_path = './tokenizer.pkl'
    run_name = 'test_run'
    mlflow_pyfunc_model_path = "nlp_kaggle"
    mlflow.set_tracking_uri("sqlite:///mlflow.db")

    if train_model:

        # preprocess data
        df = pd.read_csv('./train.csv')
        target = df[['target']]
        target_arr = target.values
        df = preprocess_df(df)
        train_corpus = create_corpus(df)

        # load embeddings
        embedding_dict = {}
        with open('./glove.6B.50d.txt', 'r') as f:
            for line in f:
                values = line.split()
                word = values[0]
                vectors = np.asarray(values[1:], 'float32')
                embedding_dict[word] = vectors
        f.close()

        # generate and save tokenizer
        tokenizer_obj = Tokenizer()
        tokenizer_obj.fit_on_texts(train_corpus)

        with open(tokenizer_path, 'wb') as f:
            pickle.dump(tokenizer_obj, f)

        # tokenize and pad
        sequences = tokenizer_obj.texts_to_sequences(train_corpus)
        tweet_pad = pad_sequences(sequences, maxlen=MAX_LEN, truncating='post', padding='post')
        df = tweet_pad[:df.shape[0]]

        word_index = tokenizer_obj.word_index
        num_words = len(word_index) + 1
        embedding_matrix = np.zeros((num_words, GLOVE_DIM))

        # fill embedding matrix
        for word, i in tqdm.tqdm(word_index.items()):
            if i > num_words:
                continue

            emb_vec = embedding_dict.get(word)
            if emb_vec is not None:
                embedding_matrix[i] = emb_vec

        X_train, X_test, y_train, y_test = train_test_split(df, target_arr, test_size=0.15)

        # generate model
        model = Sequential()
        embedding = Embedding(num_words,
                              GLOVE_DIM,
                              embeddings_initializer=Constant(embedding_matrix),
                              input_length=MAX_LEN,
                              trainable=False)
        model.add(embedding)
        model.add(SpatialDropout1D(0.2))
        model.add(LSTM(64, dropout=0.2, recurrent_dropout=0.2))
        model.add(Dense(1, activation='sigmoid'))

        optimzer = Adam(learning_rate=1e-5)
        model.compile(loss='binary_crossentropy', optimizer=optimzer, metrics=['accuracy'])

        # train and save
        model.fit(X_train, y_train, batch_size=4, epochs=EPOCHS, validation_data=(X_test, y_test), verbose=2)
        model.save(model_path)

    # save in mlflow format
    artifacts = {
        'model': model_path,
        'tokenizer_path': tokenizer_path,
    }

    conda_env = {
        'channels': ['defaults'],
        'dependencies': [
            'python=3.8',
            'pip',
            {
                'pip': [
                    'mlflow',
                    'tensorflow=={}'.format(tensorflow.__version__),
                    'cloudpickle=={}'.format(cloudpickle.__version__),
                    'nltk=={}'.format(nltk.__version__),
                    'pandas=={}'.format(pd.__version__),
                    'numpy=={}'.format(np.__version__),
                    'scikit-learn=={}'.format(sklearn.__version__),
                    'tqdm=={}'.format(tqdm.__version__)
                ],
            },
        ],
        'name': 'nlp_keras_env'
    }

    # Save and register the MLflow Model
    with mlflow.start_run(run_name=run_name) as run:
        mlflow.pyfunc.save_model(
            path=mlflow_pyfunc_model_path,
            python_model=Model(),
            conda_env=conda_env,
            artifacts=artifacts)

        result = mlflow.register_model(
            f"runs:/{run.info.run_id}/{mlflow_pyfunc_model_path}",
            f"{mlflow_pyfunc_model_path}"
        )
```
